{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License"
      ],
      "metadata": {
        "id": "n2PtbQM61lFZ"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Compute and apply vocabulary on a dataset\n",
        "\n",
        "<table align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/data_preprocessing/compute_and_apply_vocab.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/data_preprocessing/compute_and_apply_vocab.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n",
        "  </td>\n",
        "</table>\n"
      ],
      "metadata": {
        "id": "ZUSiAR62SgO8"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "The [`ComputeAndApplyVocabulary`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.tft.html#apache_beam.ml.transforms.tft.ComputeAndApplyVocabulary) data processing transform computes a unique vocabulary from a dataset and then maps each word or token to a distinct integer index. Use this transform to change textual data into numerical representations for machine learning (ML) tasks.\n",
        "\n",
        "When you train ML models that use text data, generating a vocabulary on the incoming dataset is an important preprocessing step. By mapping words to numerical indices, the vocabulary reduces the complexity and dimensionality of dataset. This step allows ML models to process the same words in a consistent way.\n",
        "\n",
        "This notebook shows how to use `MLTransform` to complete the following tasks:\n",
        "* Use `write` mode to generate a vocabulary on the input text and assign an index value to each token.\n",
        "* Use `read` mode to use the generated vocabulary and assign an index to a different dataset.\n",
        "\n",
        "`MLTransform` uses the `ComputeAndApplyVocabulary` transform, which is implemented by using `tensorflow_transform` to generate the vocabulary.\n",
        "\n",
        "For more information about using `MLTransform`, see [Preprocess data with MLTransform](https://beam.apache.org/documentation/ml/preprocess-data/) in the Apache Beam documentation"
      ],
      "metadata": {
        "id": "Ebs9JtTrqQZH"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Install the required modules\n",
        "\n",
        "To use `ComputeAndVocabulary` with `MLTransfrom`, install `tensorflow_transform` and the Apache Beam SDK version 2.53.0 or later."
      ],
      "metadata": {
        "id": "BnOMWq9C1Yo0"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "P3uUPotJo7Dc"
      },
      "outputs": [],
      "source": [
        "! pip install apache_beam[interactive]>=2.53.0 --quiet\n",
        "! pip install tensorflow-transform --quiet"
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "import os\n",
        "import tempfile\n",
        "import apache_beam as beam\n",
        "from apache_beam.ml.transforms.base import MLTransform\n",
        "from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary"
      ],
      "metadata": {
        "id": "e7-Vmawtq3xC"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Use the artifact location\n",
        "\n",
        "In `write` mode, the artifact location is used to store artifacts, such as the vocabulary file generated by `ComputeAndApplyVocabulary`.\n",
        "\n",
        "**NOTE**: The artifact location must be empty. If it isn't empty, a `RuntimeError` occurs.\n",
        "\n",
        "In `read` mode, `MLTransform` fetches artifacts from the specified artifact location. Pass the same artifact location that you used in `write` mode. Otherwise, a `RuntimeError` occurs or `MLTransform` produces unexpected results in `read` mode.\n"
      ],
      "metadata": {
        "id": "vfarBxAMFvRA"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "artifact_location = tempfile.mkdtemp(prefix='compute_and_apply_vocab_')\n",
        "artifact_location_with_frequency_threshold = tempfile.mkdtemp(prefix='compute_and_apply_vocab_frequency_threshold_')"
      ],
      "metadata": {
        "id": "wkVNu3SJrYnc"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "documents = [\n",
        "    {\"feature\": \"the quick brown fox jumps over the lazy dog\"},\n",
        "    {\"feature\": \"the five boxing wizards jump quickly in the sky\"},\n",
        "    {\"feature\": \"dogs are running in the park\"},\n",
        "    {\"feature\": \"the quick brown fox\"}\n",
        "]"
      ],
      "metadata": {
        "id": "s3bYkvXlrnRt"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "In this example, in `write` mode, `MLTransform` uses `ComputeAndApplyVocabulary` to generate vocabulary on the incoming dataset. The incoming text data is split into tokens. Each token is assigned an unique index.\n",
        "\n",
        " The generated vocabulary is stored in an artifact location that you can use on a different dataset in `read` mode."
      ],
      "metadata": {
        "id": "oETBJNVfRws_"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(documents)\n",
        "  # Compute and apply vocabulary by using MLTransform.\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(write_artifact_location=artifact_location).with_transform(\n",
        "          ComputeAndApplyVocabulary(columns=['feature'], split_string_by_delimiter=' ', vocab_filename='vocab_index'))\n",
        "      )\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "6hAYN0ZYt2da",
        "outputId": "2b843ab1-9b72-4f31-d9b9-58c2e0c0fe75"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(feature=array([ 0,  1,  4,  3, 12, 10,  0, 11, 16]))\n",
            "Row(feature=array([ 0, 14, 17,  5, 13,  8,  2,  0,  6]))\n",
            "Row(feature=array([15, 18,  7,  2,  0,  9]))\n",
            "Row(feature=array([0, 1, 4, 3]))\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Understand and visualize vocabulary\n",
        "\n",
        "When working with text data in machine learning, one common step is the generation of a vocabulary index. `MLTransform` completes this step by using the `ComputeAndApplyVocabulary` transformation. Each unique word in your text data is assigned a specific index. This index is then used to represent the text in a numerical format, which is needed for machine learning algorithms.\n",
        "\n",
        "In this example, the `ComputeAndApplyVocabulary` transformation is applied to the `feature` column. A vocabulary index is created for each unique word found in this column.\n",
        "\n",
        "To visualize and understand this generated vocabulary, use the `ArtifactsFetcher` class. This class allows you to retrieve the vocabulary list from your specified location. When you have this list, you can see the index associated with each word in your vocabulary. This index corresponds to the numerical representation used in the transformation output of `ComputeAndApplyVocabulary`.\n",
        "\n",
        "Examine this vocabulary index to understand how your text data is being processed and represented numerically. This understanding is useful for debugging and improving machine learning models that rely on text data."
      ],
      "metadata": {
        "id": "hvTvzOw8iBi9"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from apache_beam.ml.transforms.utils import ArtifactsFetcher\n",
        "artifact_fetcher = ArtifactsFetcher(artifact_location)\n",
        "vocab_list = artifact_fetcher.get_vocab_list(vocab_filename='vocab_index_feature')\n",
        "for i in range(len(vocab_list)):\n",
        "  print(f'{i}: {vocab_list[i]}')"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "dzmd_4GaiACA",
        "outputId": "d73a5bee-15bc-4cb4-d2e0-59398a62cd60"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "0: the\n",
            "1: quick\n",
            "2: in\n",
            "3: fox\n",
            "4: brown\n",
            "5: wizards\n",
            "6: sky\n",
            "7: running\n",
            "8: quickly\n",
            "9: park\n",
            "10: over\n",
            "11: lazy\n",
            "12: jumps\n",
            "13: jump\n",
            "14: five\n",
            "15: dogs\n",
            "16: dog\n",
            "17: boxing\n",
            "18: are\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Set the frequency threshold\n",
        "\n",
        "The `frequency_threshold` parameter identifies the elements that appear frequently in the dataset. This parameter limits the generated vocabulary to elements with an absolute frequency greater than or equal to the specified threshold. If you don't specify the parameter, the entire vocabulary is generated.\n",
        "\n",
        "If the frequency of a vocabulary item is less than the threshold, it's assigned a default value. You can use the `default_value` parameter to set this value. Otherwise, it defaults to `-1`."
      ],
      "metadata": {
        "id": "oZZtAVk4wJCd"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(documents)\n",
        "  # Compute and apply vocabulary by using MLTransform.\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(write_artifact_location=artifact_location_with_frequency_threshold).with_transform(\n",
        "          ComputeAndApplyVocabulary(columns=['feature'], split_string_by_delimiter=' ', frequency_threshold=2, vocab_filename='vocab_index'))\n",
        "      )\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "FpDkOSWtv1pn",
        "outputId": "ff74a769-534c-4f40-fc59-bf4a10eaea7c"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(feature=array([ 0,  1,  4,  3, -1, -1,  0, -1, -1]))\n",
            "Row(feature=array([ 0, -1, -1, -1, -1, -1,  2,  0, -1]))\n",
            "Row(feature=array([-1, -1, -1,  2,  0, -1]))\n",
            "Row(feature=array([0, 1, 4, 3]))\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "In the output, if the frequency of the token is less than the specified frequency, it's assigned to a `default_value` of `-1`. For the other tokens, a vocabulary file is generated."
      ],
      "metadata": {
        "id": "h1s4a6hzxKrb"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from apache_beam.ml.transforms.utils import ArtifactsFetcher\n",
        "artifact_fetcher = ArtifactsFetcher(artifact_location_with_frequency_threshold)\n",
        "vocab_list = artifact_fetcher.get_vocab_list(vocab_filename='vocab_index_feature')\n",
        "for i in range(len(vocab_list)):\n",
        "  print(f'{i}: {vocab_list[i]}')"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "rwgthpj9x8kp",
        "outputId": "e5fcdd51-75c1-4da1-d5a4-d7f39129cc85"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "0: the\n",
            "1: quick\n",
            "2: in\n",
            "3: fox\n",
            "4: brown\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Use MLTransform for inference workloads\n",
        "\n",
        "When `MLTransform` is in `write` mode, it produces artifacts, such as vocabulary files for `ComputeAndApplyVocabulary`. These artifacts allow you to apply the same vocabulary, and any other preprocessing transforms, when you train your model and serve it in production, or when you test its accuracy.\n",
        "\n",
        "When `MLTransform` is used `read` mode, it uses the previously generated vocabulary files to map the incoming text data. If the incoming vocabulary isn't found in the generated vocabulary, then the incoming vocabulary is mapped to a `default_value` provided during `write` mode. In this case, the `default_value` is `-1`.\n",
        "\n",
        "When `MLTransform` is in `write` mode, it produces artifacts, such as vocabulary files for `ComputeAndApplyVocabulary`.\n",
        "\n"
      ],
      "metadata": {
        "id": "PEqYpCexybO4"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "test_documents = [\n",
        "    {'feature': 'wizards are flying in the sky'},\n",
        "    {'feature': 'I love dogs'}\n",
        "]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  data_pcoll = pipeline | \"CreateData\" >> beam.Create(test_documents)\n",
        "  # Compute and apply vocabulary by using MLTransform.\n",
        "  transformed_pcoll = (\n",
        "      data_pcoll\n",
        "      | \"MLTransform\" >> MLTransform(read_artifact_location=artifact_location))\n",
        "\n",
        "  transformed_pcoll | \"Print\" >> beam.Map(print)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "x-LRXUaTyX_B",
        "outputId": "0e0b04df-a684-49c9-fe72-a032c4726078"
      },
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Row(feature=array([ 5, 18, -1,  2,  0,  6]))\n",
            "Row(feature=array([-1, -1, 15]))\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "When you specify `read_artifact_location`, you don't have to pass any transforms to `MLTransform`. Instead, `MLTransform` saves the artifacts and the transforms produced in the location specified by `write_artifact_location`."
      ],
      "metadata": {
        "id": "qwqH283F0kee"
      }
    }
  ]
}
